Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory usage of HiveSplitSource #9232

Merged
merged 6 commits into from
Nov 7, 2017
Merged

Conversation

haozhun
Copy link
Contributor

@haozhun haozhun commented Oct 28, 2017

The first 3 commits introduces a new operation to AsyncQueue and some other preparatory work. Can you please review, @dain?

@electrum, can you please review the last 3 commits?

  • The 4th commit ("Reduce memory usage of HiveSplitSource") utilize the new operation to reduce memory usage of HiveSplitSource.
  • The 5th commit ("Reduce loader concurrency in BackgroundHiveSplitLoader") fixes a minor inefficiency.
  • The 6th commit is a cleanup in Hive connector that @electrum asked me to do

Copy link
Contributor

@dain dain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First three look good... some minor comments

else {
return Futures.transform(notEmptySignal, x -> getBatch(maxSize), executor);
ListenableFuture<List<T>> borrowedListFuture;
AtomicBoolean borrowerIncremented = new AtomicBoolean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this. You only increment when the list is not empty so you can just check that in the transform below

try {
BorrowResult<T, O> borrowResult;
borrowResult = function.apply(elements);
if (!borrowerIncremented.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (elements.isEmpty)

assertEquals(queue.getBatchAsync(100).get(), ImmutableList.of("1", "2", "3"));

queue.finish();
assertTrue(queue.isFinished());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verify get after finish and maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tested in testOfferAfterFinish

assertTrue(queue.offer("2").isDone());
assertTrue(queue.offer("3").isDone());

assertFalse(queue.offer("4").isDone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the queue is size 4, why doesn't this finish?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's full once you insert the 4th one. The returned future tells you whether it's ready to accept additional inputs.

queue.offer("4");

queue.finish();
queue.offer("5");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check return value

queue.offer(4);
queue.offer(5);

Runnable runnable = () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment... remove up to three elements and reinsert them 700 times


AtomicBoolean done = new AtomicBoolean();
executor.submit(() -> {
if (!done.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while?

AtomicBoolean done = new AtomicBoolean();
executor.submit(() -> {
if (!done.get()) {
assertFalse(queue.isFinished());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this is a race

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertFalse(queue.isFinished() || !done.get()


assertFalse(queue.isFinished());
ArrayList<Integer> list = new ArrayList<>(queue.getBatchAsync(100).get());
// 1 and 2 were removed by borrow call; 8 and 9 were never inserted because insertion happened after finish.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a separate test for " 8 and 9 were never inserted because insertion happened after finish"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tested in testOfferAfterFinish

@dain dain removed their assignment Oct 30, 2017
Copy link
Contributor Author

@haozhun haozhun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed comments from @dain

assertTrue(queue.offer("2").isDone());
assertTrue(queue.offer("3").isDone());

assertFalse(queue.offer("4").isDone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's full once you insert the 4th one. The returned future tells you whether it's ready to accept additional inputs.


assertFalse(queue.isFinished());
ArrayList<Integer> list = new ArrayList<>(queue.getBatchAsync(100).get());
// 1 and 2 were removed by borrow call; 8 and 9 were never inserted because insertion happened after finish.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tested in testOfferAfterFinish

assertEquals(queue.getBatchAsync(100).get(), ImmutableList.of("1", "2", "3"));

queue.finish();
assertTrue(queue.isFinished());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tested in testOfferAfterFinish

@haozhun
Copy link
Contributor Author

haozhun commented Nov 6, 2017

@electrum Please review the last 3 commits. They should be pretty straightforward.

@electrum
Copy link
Contributor

electrum commented Nov 7, 2017

"Remove connectorId" looks good

@electrum
Copy link
Contributor

electrum commented Nov 7, 2017

"Reduce loader concurrency" looks good

Copy link
Contributor

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think InternalHiveSplit shouldn't have end

@@ -217,8 +222,9 @@ public void testOutstandingSplitSize()
100,
new Properties(),
ImmutableList.of(new HivePartitionKey("pk_col", "pk_value")),
ImmutableList.of(HostAddress.fromString("localhost")),
ImmutableList.of(new InternalHiveSplit.InternalHiveBlock(0, 100, ImmutableList.of(HostAddress.fromString("localhost")))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Qualifying InternalHiveBlock isn't needed

public int getEstimatedSizeInBytes()
{
// TODO! revisit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more detail or remove this comment. It's not useful to readers in the current form


public InternalHiveBlock(long start, long end, List<HostAddress> addresses)
{
checkArgument(start <= end, "block must not have negative length");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say "block end cannot be before block start"

checkArgument(start <= end, "block must not have negative length");
this.start = start;
this.end = end;
this.addresses = addresses;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defensive copy

int result = INSTANCE_SIZE;
result += SizeOf.sizeOfObjectArray(addresses.size());
for (HostAddress address : addresses) {
result += HOST_ADDRESS_INSTANCE_SIZE + address.getHostText().length() * Character.BYTES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add parens

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is easier to read without paren

continue;
}
if (blockStart == blockEnd && !(blockStart == start && blockEnd == start + length)) {
// skip zero-width block, except in the special circumstance: slice is empty, and the block covers the empty slice interval.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a special case for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a special case to generate a split for an empty file. Why not just check that up front?

int chunks = toIntExact((long) Math.ceil((blockLocation.getLength() - chunkOffset) * 1.0 / maxBytes));
targetChunkSize = (long) Math.ceil((blockLocation.getLength() - chunkOffset) * 1.0 / chunks);
}
// TODO: remainingInitialSplits, maxInitialSplitSize, maxSplitSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?

{
checkArgument(!blocks.isEmpty());
checkArgument(start == blocks.get(0).getStart());
checkArgument(start + length == blocks.get(blocks.size() - 1).getEnd());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't this fail for the first block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the question

catch (IOException e) {
throw new UncheckedIOException(e);
}
for (String host : hosts) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Arrays.stream(hosts)
        .map(HostAddress::fromString)
        .collect(toImmutableList());

}
estimatedSplitSizeInBytes.addAndGet(-removedEstimatedSizeInBytes);

ImmutableList<InternalHiveSplit> splitsToInsert = splitsToInsertBuilder.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declare as List

Previously, HiveSplitSource exposes oustanding split count by querying the
size of the underlying AsyncQueue. The AsyncQueue.size method need to be
removed as its definition will become ambiguous with addition of new method
in the next commit.
@haozhun haozhun force-pushed the hive-split branch 2 times, most recently from e9bc73e to da6fee1 Compare November 7, 2017 23:05
Use each InternalHiveSplit to represent a file, instead of part of a file.
Each InternalHiveSplit can now correspond to multiple HiveSplits.
Responsibility to produce splits of appropriate size has been moved from
BackgroundHiveSplitLoader to HiveSplitSource.
Previously, BackgroundHiveSplitLoader uses maxPartitionBatchSize as
the number of maximum concurrent threads per split source.
The default value for the configuration parameter is 100.
This is wasteful and unnecessary.

Before the previous rewrite of BackgroundHiveSplitLoader, the number of
threads is not directly bounded, but is finite due to other constraints.
At the time, the maximum possible number of threads possible were chosen.
@vgankidi
Copy link

@haozhun We're seeing some performance regression with the reduced loader concurrency in BackgroundHiveSplitLoader: https://groups.google.com/forum/#!topic/presto-users/AlEr1jq-R0E
Although this config is tunable, what do you think about bumping the default value a bit higher? 4 split loader threads per HiveSplitSource seem low for s3 listings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants